﻿#include "zmqsubscriber.h"
#include <QDebug>
#include <QElapsedTimer>
#include <QtConcurrent/QtConcurrent>
#include "zlib.h"


ZmqSubscriber::ZmqSubscriber(QObject *parent) :BaseSubscriber(parent)
{

}

bool ZmqSubscriber::Publish(QByteArray data)
{
    Q_UNUSED(data);
    return true;
}

bool ZmqSubscriber::Connect()
{
    bool ret=initSUBModuleConnect();
    QtConcurrent::run([this]()
    {
        run();
    });
    return ret;
}

bool ZmqSubscriber::IsConnected()
{
    return true;
}

void ZmqSubscriber::Dispose()
{
    isRun=false;
}

bool ZmqSubscriber::initSUBModuleConnect()
{
    _context = zmq_ctx_new();
    _socket = zmq_socket(_context, ZMQ_SUB);
    qDebug()<< _communicationParam.toUtf8();
    //"tcp://localhost:5556"
    if(zmq_connect(_socket, _communicationParam.toUtf8()) < 0) {
         qDebug()<<"Failed to open the socket "<<_communicationParam;
         return false;
    }
    //重要：设置订阅主题，否则接收不到数据
    if(zmq_setsockopt(_socket, ZMQ_SUBSCRIBE, "", 0) < 0) {
        qDebug()<<"Failed to setsocketopt ZMQ_SUBSCRIBE error";
        return false;
    }

    int mstimeout = 3000;
    if(zmq_setsockopt(_socket, ZMQ_RCVTIMEO, &mstimeout, sizeof(int)) < 0) {
        qDebug()<<"Failed to setsocketopt ZMQ_RCVTIMEO error";
        return false;
    }

    //设置保活机制
    int tcp_keep_alive = 1;
    if(zmq_setsockopt(_socket, ZMQ_TCP_KEEPALIVE, &tcp_keep_alive, sizeof(tcp_keep_alive)) < 0) {
        qDebug()<<"Failed to setsocketopt ZMQ_TCP_KEEPALIVE error";
        return false;
    }
    return true;
}

void ZmqSubscriber::run()
{
    isRun = true;
    int RECEIVE_MSG_MAX_LENGTH=1024*1024;
    unsigned char *receivetMsg = new unsigned char[RECEIVE_MSG_MAX_LENGTH];
    QElapsedTimer waittime;
    waittime.start();
    while(isRun) {
        QThread::msleep(50);
        if(_socket) {
            int len = zmq_recv(_socket, receivetMsg, RECEIVE_MSG_MAX_LENGTH * sizeof(unsigned char),0);
            if(len < 0) {
               QThread::msleep(500);
            } else {
                QByteArray data((char*)(receivetMsg), len);
                emit Receive(data);
                waittime.start();
            }

            if(waittime.elapsed() > 1000*60*5) {//超过5分钟没有数据，则重新连接
                qDebug()<<__FUNCTION__<<__LINE__<<"no data arrive for longtime!";
                waittime.restart();
                initSUBModuleConnect();    //测试是否是导致数据冗余原因，暂时注释
            }
        } else {
            qDebug()<<__FUNCTION__<<__LINE__<<"SUB comm is error ,reconnectting...";
//            initSUBModuleConnect();
        }
    }
    delete [] receivetMsg;
}
